package com.example.hotitemanalysis;

import com.example.bean.Order;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.net.URL;

public class OrderAnalysis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        URL url = OrderAnalysis.class.getResource("/Order.csv");
        DataStream<String> inputStream = env.readTextFile(url.getPath());
        //inputStream.print("inputStream");
        SingleOutputStreamOperator<Order> dataStream = inputStream.map(new MapFunction<String, Order>() {
            @Override
            public Order map(String line) throws Exception {
                return new Order(line.split(",")[0], Long.valueOf(line.split(",")[1]));
            }
        });
        //dataStream.print("dataStream");
        dataStream.keyBy((KeySelector<Order, String>) order -> order.getOrderid()).sum("quantity").print();

        //求最大商品ID和最大的销售数量
        dataStream.keyBy((KeySelector<Order, String>) order -> order.getOrderid()).reduce(new ReduceFunction<Order>() {
            @Override
            public Order reduce(Order order1, Order order2) throws Exception {
                return new Order(order2.getOrderid(), Math.max(order1.getQuantity(), order2.getQuantity()));
            }
        }).print();
        env.execute("job- hot item analysis");
    }

}
